系統的 Log 除了基本的 Request Log 及 Error Log 之外,特定的功能也會有記錄 Log 的需求,例如使用者登入時,需要記錄來源 IP、裝置及登入失敗的原因;寄送訊息通知時,記錄訊息類型、寄送方式(channel)及第三方服務的回應來判斷是否成功寄送。即使這些 Log 的資料欄位及寫入時機不相同,但我們仍需要有統一的方式處理。
LogMessage
資料類別,再根據寫入的目的地,轉換為特定的資料格式,例如 JSONLogWriter
寫到任意目的地,例如資料庫,或是轉為 JSON 寫入到 AWS Kinesis Data Firehose。LogMessage
的 LogType
決定對應的 LogWriter
寫入 log 到特定的目的地LogWriter
,只要實作 LogMessage
資料類別就好那為什麼不使用 Logging Framework 就好呢? 例如 Logback
AwsKinesisLogWriter
使用 Kinesis Firehose SDK 傳送 JSON 到 ElasticaSearch (我沒有在 EC2 安裝 Kinesis Agent 轉一手處理 log 資料)。AWS EKK 服務介紹可以看這篇文章 From ELK Stack to EKK: Aggregating and Analyzing Apache Logs with Amazon Elasticsearch Service, Amazon Kinesis, and Kibana
每種功能的 Log 資料類別都要繼承 LogMessage
,至少包含以下3個欄位
各種 Log 資料類別的程式碼在此
abstract class LogMessage : IdentifiableObject<UUID>() {
abstract val occurAt: Instant
abstract val logType: String
abstract val logLevel: LogLevel
fun toJson(): JsonElement = json.encodeToJsonElement(this)
}
@Serializable
data class LoginLog(
@Serializable(with = UUIDSerializer::class) val userId: UUID,
val resultCode: LoginResultCode,
@Serializable(with = InstantSerializer::class) override val occurAt: Instant,
val project: String,
val source: PrincipalSource,
val tenantId: TenantId? = null,
val clientId: String? = null,
val clientVersion: String?,
val ip: String? = null,
val sid: String? = null
) : LogMessage() {
@Serializable(with = UUIDSerializer::class)
override val id: UUID = UUID.randomUUID()
override val logType: String = LOG_TYPE
override val logLevel: LogLevel = Log_Level
companion object {
const val LOG_TYPE = "login"
private val Log_Level = LogLevel.INFO
}
}
目前 LogMessage 可以使用以下 LogWriter 寫到不同目的地
interface LogWriter {
fun write(message: LogMessage)
fun shutdown() {}
}
LogMessageDispatcher 也實作 LogWriter 介面,當呼叫 write(message: LogMessage)
方法時,會根據 LogMessage 的 LogType 找出事先已註冊的 LogWriter,再呼叫 write(message: LogMessage)
方法寫入 log
class LogMessageDispatcher(private val defaultLogWriter: LogWriter? = null) : LogWriter {
private val logWriters: MutableMap<String, LogWriter> = mutableMapOf()
fun register(logType: String, logWriter: LogWriter) {
require(!logWriters.containsKey(logType))
logWriters[logType] = logWriter
}
override fun write(message: LogMessage) {
val logWriter = logWriters[message.logType] ?: defaultLogWriter ?: throw InternalServerException(
InfraResponseCode.SERVER_CONFIG_ERROR, "logType ${message.logType} logWriter is not registered"
)
logWriter.write(message)
}
override fun shutdown() {
logWriters.values.forEach { it.shutdown() }
}
}
這裡使用 decorator pattern 的手法,讓 LogMessageDispatcher
及 LogMessageCoroutineActor
都實作 LogWriter 介面,所以呼叫 write(message: LogMessage)
方法時,會 delegate 到內部的 LogWriter,執行順序會是 LogMessageCoroutineActor → LogMessageDispatcher → XXXLogWriter
。LogMessageCoroutineActor
內部是以 Coroutine SendChannel 達成非同步執行,更多 CoroutineActor 的實作細節,請參考 [Day 21] 使用 Coroutine SendChannel 處理非同步工作
class LogMessageCoroutineActor(
coroutineActorConfig: CoroutineActorConfig,
private val logWriter: LogWriter
) : LogWriter {
private val logger = KotlinLogging.logger {}
private val actorName = "LogWriterActor"
private val actor: CoroutineActor<LogMessage> = CoroutineActor(
actorName, Channel.UNLIMITED,
coroutineActorConfig, Dispatchers.IO,
this::execute
)
override fun write(message: LogMessage) {
actor.sendToUnlimitedChannel(message, InfraResponseCode.LOG_ERROR) // non-blocking by Channel.UNLIMITED
}
private fun execute(message: LogMessage) {
try {
logWriter.write(message)
} catch (e: Throwable) {
logger.error("$actorName execute error => $message", e)
}
}
override fun shutdown() {
actor.close()
logWriter.shutdown()
}
}
Logging Plugin 會根據 application.conf 設定檔,初始化建立對應的 LogWriter,然後註冊至 Koin DI。因為 Request Log 及 Error Log 是系統最基本的 LogType,所以我會在 Logging Plugin 裡面進行設定。以下面的設定值為例,Request Log 會使用 AwsKinesisLogWriter
,Error Log 則是使用 ErrorLogDBWriter
logging {
request {
enabled = true
destination = "AwsKinesis" # File(default), Database, AwsKinesis
includeHeaders = false
includeQueryString = false
includeResponseBody = false
includeGetMethod = false
excludePaths = ["/ops/sysstat/healthCheck"]
excludeRequestBodyPaths = ["/login", "/myPassword"]
}
error {
enabled = true
destination = "Database" # File(default), Database, AwsKinesis
// TODO integrate notification feature to notify OpsTeam by email, sms...
}
writer {
awsKinesis {
streamName = "logging"
nettyClient {
#maxConcurrency = 50 # => aws default value = 50
#maxPendingConnectionAcquires = 10000 => aws default value = 10000
#maxIdleConnectionTimeout = 60s => aws default value = 60s
}
threadPool {
fixedPoolSize = 3
}
}
}
asyncExecutor {
coroutineActor {
coroutines = 1
dispatcher {
fixedPoolSize = 1
}
}
}
}
下面是 Logging Plugin 的內部程式碼,我會先建立 LogMessageDispatcher
物件,後續每種 LogType 的 LogWriter 要註冊到 LogMessageDispatcher 裡面,這樣子寫入 log 時就可以根據 LogMessage 的 LogType,取出對應的 LogWriter 執行寫入動作。如果 LogType 沒有設定對應的 LogWriter,就會使用預設的 FileLogWriter 寫到檔案。
pipeline.koin {
modules(
module(createdAtStart = true) {
single { loggingConfig }
val fileLogWriter = FileLogWriter()
single { fileLogWriter }
val logMessageDispatcher = LogMessageDispatcher(FileLogWriter())
single { logMessageDispatcher }
val awsKinesisLogWriter = loggingConfig.writer?.awsKinesis?.let {
AwsKinesisLogWriter(it, serverConfig)
}
if (awsKinesisLogWriter != null)
single { awsKinesisLogWriter }
if (loggingConfig.request.enabled) {
val requestLogWriter = when (loggingConfig.request.destination) {
LogDestination.File -> fileLogWriter
LogDestination.Database -> RequestLogDBWriter()
LogDestination.AwsKinesis -> awsKinesisLogWriter ?: throw InternalServerException(
InfraResponseCode.SERVER_CONFIG_ERROR, "AwsKinesisLogWriter is not configured"
)
}
logMessageDispatcher.register(RequestLog.LOG_TYPE, requestLogWriter)
}
if (loggingConfig.error.enabled) {
val errorLogWriter = when (loggingConfig.error.destination) {
LogDestination.File -> fileLogWriter
LogDestination.Database -> ErrorLogDBWriter()
LogDestination.AwsKinesis -> awsKinesisLogWriter ?: throw InternalServerException(
InfraResponseCode.SERVER_CONFIG_ERROR, "kinesisLogWriter is not configured"
)
}
logMessageDispatcher.register(ErrorLog.LOG_TYPE, errorLogWriter)
}
val logWriter = loggingConfig.asyncExecutor?.let {
LogMessageCoroutineActor(it.coroutineActor, logMessageDispatcher)
} ?: logMessageDispatcher
single { logWriter }
KoinApplicationShutdownManager.register { logWriter.shutdown() }
}
)
}
先安裝 Logging Plugin 初始化 LogMessageDispatcher 之後,SessionAuth Plugin 就可以透過 Koin DI 取出 LogMessageDispatcher
物件,再註冊 LoginLog 使用 AwsKinesisLogWriter
。當使用者登入、登出時,就可以使用 AwsKinesisLogWriter
寫入 LoginLog。
sessionAuth {
storageType = "Redis" # Redis
redisKeyExpiredNotification = true
session {
expireDuration = 1d
extendDuration = 15m
}
logging {
enabled = true
destination = "AwsKinesis" # File(default), Database, AwsKinesis
}
}
val loginLogWriter = when (authConfig.logging.destination) {
LogDestination.File -> pipeline.get<FileLogWriter>()
LogDestination.Database -> LoginLogDBWriter()
LogDestination.AwsKinesis -> pipeline.get<AwsKinesisLogWriter>()
}
val logMessageDispatcher = pipeline.get<LogMessageDispatcher>()
logMessageDispatcher.register(LoginLog.LOG_TYPE, loginLogWriter)
系統錯誤可分為2種,一種是處理外部請求的錯誤,我們可以在 Global Exception Handler ,先透過 Koin DI 取得 LogWriter,再把 Exception 轉為 ErrorLog 物件寫入。
install(StatusPages) {
val loggingConfig = get<LoggingConfig>()
val logWriter = get<LogWriter>()
val responseCreator = get<I18nResponseCreator>()
exception<Throwable> { cause ->
val e = ExceptionUtils.wrapException(cause)
ExceptionUtils.writeLogToFile(e, call)
if (loggingConfig.error.enabled) {
logWriter.write(ErrorLog.request(e, call))
}
val errorResponse = responseCreator.createErrorResponse(e, call)
call.respond(errorResponse)
}
}
另一種情況是系統內部背景排程、執行非同步工作的錯誤。例如 DBAsyncTaskCoroutineActor 非同步寫入資料庫時,我們要 catch Exception 轉為 ErrorLog 物件再寫入,差別在於這裡沒有外部請求的 ApplicationCall 物件
private fun execute(task: DBAsyncTask) {
try {
transaction {
task.block(this)
}
} catch (e: Throwable) {
val errorMsg = "$actorName execute error"
logger.error("errorMsg => $task", e)
logWriter.write(
ErrorLog.internal(
InternalServerException(
InfraResponseCode.DB_ASYNC_TASK_ERROR, errorMsg, e,
mapOf("taskId" to task.id, "taskType" to task.type)
),
actorName, task.id.toString()
)
)
}
}
今天我說明了如何建立系統的 Logging 機制處理不同類型的 log,另一種類似的需求是系統也需要對各種非同步工作採用一致的方式處理。例如今天提到的 LogMessageCoroutineActor
非同步寫入 log,還有 DBAsyncTaskCoroutineActor
非同步寫入資料庫。明天我會說明如何把 Coroutine SendChannel 包裝成不同功能的 CoroutineActor 執行非同步工作。